{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "<!--\n",
    "    Licensed to the Apache Software Foundation (ASF) under one\n",
    "    or more contributor license agreements.  See the NOTICE file\n",
    "    distributed with this work for additional information\n",
    "    regarding copyright ownership.  The ASF licenses this file\n",
    "    to you under the Apache License, Version 2.0 (the\n",
    "    \"License\"); you may not use this file except in compliance\n",
    "    with the License.  You may obtain a copy of the License at\n",
    "\n",
    "      http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    "    Unless required by applicable law or agreed to in writing,\n",
    "    software distributed under the License is distributed on an\n",
    "    \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "    KIND, either express or implied.  See the License for the\n",
    "    specific language governing permissions and limitations\n",
    "    under the License.\n",
    "-->\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Explore data from covidtracking.com\n",
    "The data set is relatively small and used as a demonstration of working with Beam in an interactive notebook environment.\n",
    "\n",
    "There are two ways to get the data:\n",
    "\n",
    "- Get json data from APIs.\n",
    "- Download data in csv files directly.\n",
    "\n",
    "We'll have a batch Beam pipeline example utilizing either method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "import requests\n",
    "\n",
    "json_current='https://covidtracking.com/api/v1/states/current.json'\n",
    "json_historical='https://covidtracking.com/api/v1/states/daily.json'\n",
    "\n",
    "def get_json_data(url):\n",
    "  with requests.Session() as session:\n",
    "    data = json.loads(session.get(url).text)\n",
    "  return data\n",
    "\n",
    "csv_current = 'https://covidtracking.com/api/v1/states/current.csv'\n",
    "csv_historical = 'https://covidtracking.com/api/v1/states/daily.csv'\n",
    "\n",
    "def download_csv(url, filename):\n",
    "  if not filename.endswith('.csv'):\n",
    "    filename = filename + '.csv'\n",
    "  with requests.Session() as session:\n",
    "    with open(filename, 'wb') as f:\n",
    "      f.write(session.get(url).content)\n",
    "  return filename"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below reads data into memory as json."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "current_data = get_json_data(json_current)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Below downloads data in csv format stored in files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "csv_file_current = download_csv(csv_current, 'current')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Prepare some Apache Beam dependencies."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from apache_beam.runners.interactive import interactive_beam as ib\n",
    "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a Beam pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "p = beam.Pipeline(runner=InteractiveRunner())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can create a PCollection from either in-memory json data or data in files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "current_data_from_json = p | 'Create PCollection from json' >> beam.Create(current_data)\n",
    "current_data_from_files = p | 'Create PCollection from files' >> beam.io.ReadFromText(csv_file_current, skip_header_lines=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The in-memory json data is already structured."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(current_data_from_json)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The data from files read as plain text is not structured, we'll have to handle it.\n",
    "\n",
    "For a batch pipeline reading files with huge content size, it's normal to read source data from files and let Beam handle the work load."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(current_data_from_files)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We'll parse the plain texts into structured data with Beam SDK."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from csv import reader\n",
    "\n",
    "def read_headers(csv_file):\n",
    "  with open(csv_file, 'r') as f:\n",
    "    header_line = f.readline().strip()\n",
    "  return next(reader([header_line]))\n",
    "\n",
    "current_data_headers = read_headers(csv_file_current)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from collections import namedtuple\n",
    "\n",
    "UsCovidData = namedtuple('UsCovidData', current_data_headers)\n",
    "\n",
    "class UsCovidDataCsvReader(beam.DoFn):\n",
    "  def __init__(self, schema):\n",
    "    self._schema = schema\n",
    "    \n",
    "  def process(self, element):\n",
    "    values = [int(val) if val.isdigit() else val for val in next(reader([element]))]\n",
    "    return [self._schema(*values)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "current_data = current_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(UsCovidData))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(current_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With Interactive Beam, you can collect a PCollection into a pandas dataframe. It's useful when you just want to play with small test data sets locally on a single machine."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = ib.collect(current_data)\n",
    "df.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's take a deeper look into the data with the visualization feature of Interactive Beam and come up with some tasks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(current_data, visualize_data=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can find out that NY currently has the most positive COVID cases with above facets visualization because the data set is small (for demo).\n",
    "\n",
    "Now we can write a beam transform to try to get that same conclusion of which state has the highest positive number currently."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from functools import total_ordering\n",
    "\n",
    "@total_ordering\n",
    "class UsCovidDataOrderByPositive:\n",
    "  def __init__(self, data):\n",
    "    self._data = data\n",
    "  \n",
    "  def __gt__(self, other):\n",
    "    return self._data.positive > other._data.positive\n",
    "\n",
    "\n",
    "def maximum_positive(values):\n",
    "  return max(values) if values else None\n",
    "\n",
    "max_positive = (current_data \n",
    "                | 'Data OrderByPositive' >> beam.Map(lambda data: UsCovidDataOrderByPositive(data))\n",
    "                | 'Find Maximum Positive' >> beam.CombineGlobally(maximum_positive)\n",
    "                | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(max_positive)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also try to come up with the total positive case number in the US."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "total_positive = (current_data\n",
    "                  | 'Positive Per State' >> beam.Map(lambda data: data.positive)\n",
    "                  | 'Total Positive' >> beam.CombineGlobally(sum))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(total_positive)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's look at some more complicated data: the historical data.\n",
    "\n",
    "It contains similar data to current for each day until current day."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "csv_file_historical = download_csv(csv_historical, 'historical')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "historical_data_from_files = p | 'Create PCollection for historical data from files' >> beam.io.ReadFromText(csv_file_historical, skip_header_lines=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(historical_data_from_files)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "historical_data_headers = read_headers(csv_file_historical)\n",
    "\n",
    "HistoricalUsCovidData = namedtuple('HistoricalUsCovidData', historical_data_headers)\n",
    "\n",
    "historical_data = historical_data_from_files | 'Parse' >> beam.ParDo(UsCovidDataCsvReader(HistoricalUsCovidData))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(historical_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For demostration, let's just take a look at NY throughout the timeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class FilterByState(beam.DoFn):\n",
    "  def __init__(self, state):\n",
    "    self._state = state\n",
    "    \n",
    "  def process(self, element):\n",
    "    if element.state == self._state:\n",
    "      yield element\n",
    "\n",
    "ny_data = historical_data | 'Filter NY' >> beam.ParDo(FilterByState('NY'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we do a visualization to see if there is anything worth looking for."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(ny_data, visualize_data=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There happens to be a field named `positiveIncrease`. If not, we'll need to write some transforms to deduce the per day positive increment value.\n",
    "\n",
    "Now let's try to find out the date with the most `positiveIncrease` for NY."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@total_ordering\n",
    "class UsCovidDataOrderByPositiveIncrease:\n",
    "  def __init__(self, data):\n",
    "    self._data = data\n",
    "  \n",
    "  def __gt__(self, other):\n",
    "    self_positive_increase = self._data.positiveIncrease if self._data.positiveIncrease else 0\n",
    "    other_positive_increase = other._data.positiveIncrease if other._data.positiveIncrease else 0\n",
    "    return self_positive_increase > other_positive_increase\n",
    "\n",
    "\n",
    "def maximum_positive_increase(values):\n",
    "  return max(values) if values else None\n",
    "\n",
    "worst_day = (ny_data\n",
    "             | 'Order By PositiveIncrease' >> beam.Map(lambda data: UsCovidDataOrderByPositiveIncrease(data))\n",
    "             | 'Maximum Positive Increase' >> beam.CombineGlobally(maximum_positive_increase)\n",
    "             | 'Convert Back to Data' >> beam.Map(lambda orderable_data: orderable_data._data)\n",
    "             | 'Extract Date' >> beam.Map(lambda data: data.date))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ib.show(worst_day)"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
